ARIMA (Autoregressive Integrated Moving Average)#

ARIMA models a (possibly non-stationary) univariate time series by:

  1. differencing it until it is approximately stationary (the I),

  2. fitting an ARMA model to the differenced series (the AR + MA),

  3. integrating forecasts back to the original scale.

What you’ll learn#

  • what “integration” means in ARIMA (differencing)

  • why differencing helps with trends / unit roots (non-stationarity)

  • how to choose \((p, d, q)\) (intuition + ACF/PACF)

  • a low-level NumPy implementation (fit + forecast + uncertainty)

  • Plotly visuals: raw vs. differenced series, forecasts, and confidence intuition

import numpy as np
import pandas as pd
import plotly.graph_objects as go
import os
import plotly.io as pio

from plotly.subplots import make_subplots
from scipy.optimize import minimize
from statsmodels.tsa.stattools import adfuller

pio.templates.default = "plotly_white"
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")

np.set_printoptions(precision=4, suppress=True)
rng = np.random.default_rng(1)

ARIMA in one equation (with backshift notation)#

Let \(B\) be the backshift operator: \(B y_t = y_{t-1}\). Define the differencing operator \(\nabla = 1 - B\).

An ARIMA\((p,d,q)\) model can be written compactly as:

\[ \phi(B)\, \nabla^d y_t = c + \theta(B)\, \varepsilon_t, \qquad \varepsilon_t \sim \mathrm{WN}(0,\sigma^2) \]

where:

  • \(\phi(B) = 1 - \phi_1 B - \cdots - \phi_p B^p\) (AR polynomial)

  • \(\theta(B) = 1 + \theta_1 B + \cdots + \theta_q B^q\) (MA polynomial)

  • \(\nabla^d y_t = (1-B)^d y_t\) is the \(d\)-th differenced series

  • \(\varepsilon_t\) is (ideally) white noise (uncorrelated, constant variance)

Interpretation:

  • AR\((p)\): “today relates to the last \(p\) values”

  • I\((d)\): “work on changes, not levels, to remove trends / unit roots”

  • MA\((q)\): “today relates to the last \(q\) shocks / errors”

Integration = differencing (precisely)#

ARIMA’s “Integrated” part is historical terminology: instead of modeling \(y_t\) directly, we model its differences, then sum (integrate) back to get predictions on the original scale.

First difference#

\[ \Delta y_t = y_t - y_{t-1} = (1 - B) y_t. \]

Second difference#

\[ \Delta^2 y_t = \Delta(\Delta y_t) = y_t - 2y_{t-1} + y_{t-2}. \]

\(d\)-th difference (general form)#

\[ \Delta^d y_t = (1 - B)^d y_t = \sum_{k=0}^{d} (-1)^k \binom{d}{k} y_{t-k}. \]

This is a discrete analog of taking derivatives: each differencing step removes one degree of polynomial trend (roughly speaking).

Why ARIMA handles non-stationarity#

ARMA models assume (weak) stationarity: mean and autocovariances are time-invariant. Many real series aren’t stationary because they contain a stochastic trend (a unit root), e.g. a random walk.

Unit-root example (random walk)#

\[ y_t = y_{t-1} + \varepsilon_t \quad \Rightarrow \quad \Delta y_t = \varepsilon_t. \]

The level series \(y_t\) is non-stationary, but the increment series \(\Delta y_t\) is stationary (white noise). ARIMA uses \(d\) differences to reduce the series to something ARMA can model.

Deterministic trend intuition#

If \(y_t = a + bt + u_t\) with stationary \(u_t\), then: $\( \Delta y_t = b + (u_t - u_{t-1}), \)$ so differencing removes the linear trend component.

Practical warning: over-differencing#

If you difference too much (choose \(d\) larger than needed), you can introduce extra moving-average structure, increase variance, and harm forecasts.

def simulate_arma(n: int, *, c: float, phi: np.ndarray, theta: np.ndarray, sigma: float, burn_in: int = 300, rng: np.random.Generator) -> np.ndarray:
    """Simulate a stationary ARMA(p,q): y_t = c + sum(phi_i y_{t-i}) + eps_t + sum(theta_j eps_{t-j})."""
    phi = np.asarray(phi, dtype=float)
    theta = np.asarray(theta, dtype=float)
    p, q = len(phi), len(theta)

    m = n + burn_in
    eps = rng.normal(0.0, sigma, size=m)
    y = np.zeros(m)

    for t in range(m):
        ar = 0.0
        for i in range(1, p + 1):
            if t - i >= 0:
                ar += phi[i - 1] * y[t - i]

        ma = 0.0
        for j in range(1, q + 1):
            if t - j >= 0:
                ma += theta[j - 1] * eps[t - j]

        y[t] = c + ar + eps[t] + ma

    return y[burn_in:]


# Build a non-stationary series by integrating a stationary ARMA increment process.
n = 320
dates = pd.date_range("2020-01-01", periods=n, freq="D")

# Increment process: ARMA(2,1)
phi_true = np.array([0.6, -0.2])
theta_true = np.array([0.5])
drift = 0.10
sigma = 1.0

dy = simulate_arma(n, c=drift, phi=phi_true, theta=theta_true, sigma=sigma, rng=rng)
y = 100 + np.cumsum(dy)  # integrated => non-stationary level series

d1 = np.diff(y, n=1)

raw_p = adfuller(y)[1]
diff_p = adfuller(d1)[1]
print(f"ADF p-value (raw y):        {raw_p:.3g}")
print(f"ADF p-value (diff Δy):      {diff_p:.3g}")
ADF p-value (raw y):        0.824
ADF p-value (diff Δy):      2.23e-12
fig = make_subplots(
    rows=2,
    cols=1,
    shared_xaxes=True,
    vertical_spacing=0.08,
    subplot_titles=("Raw series (level)", "First difference (Δy)")
)

fig.add_trace(go.Scatter(x=dates, y=y, mode="lines", name="y"), row=1, col=1)
fig.add_trace(go.Scatter(x=dates[1:], y=d1, mode="lines", name="Δy"), row=2, col=1)

fig.update_yaxes(title_text="Value", row=1, col=1)
fig.update_yaxes(title_text="Value", row=2, col=1)
fig.update_xaxes(title_text="Time", row=2, col=1)
fig.update_layout(height=550, title="Raw vs. differenced series")

fig.show()

Choosing \((p, d, q)\) (intuition + common workflow)#

1) Choose \(d\) (integration order)#

  • Use plots: trend / changing level often suggests differencing.

  • Use unit-root / stationarity tests (imperfect but useful): ADF, KPSS.

  • Difference the minimum number of times needed to make the series look approximately stationary.

Rules of thumb:

  • Many business series: \(d \in \{0,1\}\).

  • If the series has strong seasonality: you often need seasonal differencing too (that’s SARIMA).

2) Choose \(p\) and \(q\) on the differenced series#

On \(x_t = \Delta^d y_t\) (now assumed stationary), use:

  • PACF shape to guess \(p\) (AR order)

  • ACF shape to guess \(q\) (MA order)

  • then refine with AIC/BIC or rolling validation.

Heuristics (idealized patterns):

  • AR\((p)\): PACF cuts off after lag \(p\), ACF tails off

  • MA\((q)\): ACF cuts off after lag \(q\), PACF tails off

  • ARMA: both tail off

def acf_np(x: np.ndarray, nlags: int) -> np.ndarray:
    x = np.asarray(x, dtype=float)
    x = x - x.mean()
    denom = np.dot(x, x)
    out = np.empty(nlags + 1)
    out[0] = 1.0
    for k in range(1, nlags + 1):
        out[k] = np.dot(x[k:], x[:-k]) / denom
    return out


def pacf_ols(x: np.ndarray, nlags: int) -> np.ndarray:
    """Partial autocorrelation via OLS: pacf(k) = coeff on lag k in an AR(k) regression."""
    x = np.asarray(x, dtype=float)
    x = x - x.mean()
    out = np.empty(nlags + 1)
    out[0] = 1.0

    for k in range(1, nlags + 1):
        y_reg = x[k:]
        X_lags = np.column_stack([x[k - i - 1 : -(i + 1)] for i in range(k)])
        X = np.column_stack([np.ones(len(y_reg)), X_lags])
        beta, *_ = np.linalg.lstsq(X, y_reg, rcond=None)
        out[k] = beta[-1]

    return out


nlags = 24
acf_vals = acf_np(d1, nlags=nlags)
pacf_vals = pacf_ols(d1, nlags=nlags)
lags = np.arange(nlags + 1)

fig = make_subplots(rows=1, cols=2, subplot_titles=("ACF (Δy)", "PACF (Δy)"), horizontal_spacing=0.12)
fig.add_trace(go.Bar(x=lags, y=acf_vals, name="ACF"), row=1, col=1)
fig.add_trace(go.Bar(x=lags, y=pacf_vals, name="PACF"), row=1, col=2)

fig.update_xaxes(title_text="Lag", row=1, col=1)
fig.update_xaxes(title_text="Lag", row=1, col=2)
fig.update_yaxes(title_text="Correlation", row=1, col=1)
fig.update_yaxes(title_text="Correlation", row=1, col=2)
fig.update_layout(height=350, title="Autocorrelation diagnostics on differenced series")
fig.show()

Low-level NumPy ARIMA implementation (educational)#

We’ll implement a simple ARIMA\((p,d,q)\) fit using conditional sum of squares (CSS):

  1. Difference: \(x_t = \Delta^d y_t\)

  2. Fit ARMA on \(x_t\) by minimizing squared one-step-ahead residuals

ARMA residual recursion#

For \(x_t\) modeled as: $\( x_t = c + \sum_{i=1}^p \phi_i x_{t-i} + \varepsilon_t + \sum_{j=1}^q \theta_j \varepsilon_{t-j}, \)\( the residuals satisfy: \)\( \varepsilon_t = x_t - c - \sum_{i=1}^p \phi_i x_{t-i} - \sum_{j=1}^q \theta_j \varepsilon_{t-j}. \)$

We minimize \(\sum_t \varepsilon_t^2\) over \((c, \phi, \theta)\).

This is not a full maximum-likelihood fit (which handles initial conditions and constraints more carefully), but it’s a good “from scratch” view of how ARIMA works.

def difference_levels(y: np.ndarray, d: int) -> tuple[np.ndarray, list[float]]:
    """Return Δ^d y and the last values needed to invert the differencing."""
    y = np.asarray(y, dtype=float)
    if d < 0:
        raise ValueError("d must be >= 0")
    if d == 0:
        return y.copy(), []

    levels = [y]
    for _ in range(d):
        levels.append(levels[-1][1:] - levels[-1][:-1])

    last_values = [levels[i][-1] for i in range(d)]
    return levels[-1], last_values


def undifference_forecast(diff_forecast: np.ndarray, last_values: list[float]) -> np.ndarray:
    """Invert differencing for forecasts; last_values are [last y, last Δy, ..., last Δ^(d-1)y]."""
    diff_forecast = np.asarray(diff_forecast, dtype=float)
    d = len(last_values)
    if d == 0:
        return diff_forecast.copy()

    states = np.array(last_values, dtype=float)
    out = np.empty(len(diff_forecast))

    for t, delta_d in enumerate(diff_forecast):
        delta = delta_d
        for k in range(d - 1, -1, -1):
            states[k] = states[k] + delta
            delta = states[k]
        out[t] = states[0]

    return out


def arma_residuals(x: np.ndarray, c: float, phi: np.ndarray, theta: np.ndarray) -> np.ndarray:
    x = np.asarray(x, dtype=float)
    phi = np.asarray(phi, dtype=float)
    theta = np.asarray(theta, dtype=float)
    p, q = len(phi), len(theta)

    eps = np.zeros_like(x)
    for t in range(len(x)):
        ar = 0.0
        for i in range(1, p + 1):
            if t - i >= 0:
                ar += phi[i - 1] * x[t - i]

        ma = 0.0
        for j in range(1, q + 1):
            if t - j >= 0:
                ma += theta[j - 1] * eps[t - j]

        eps[t] = x[t] - c - ar - ma

    return eps


def _arma_css_sse(params: np.ndarray, x: np.ndarray, p: int, q: int) -> float:
    c = params[0]
    phi = params[1 : 1 + p]
    theta = params[1 + p : 1 + p + q]
    eps = arma_residuals(x, c=c, phi=phi, theta=theta)
    start = max(p, q)
    return float(np.sum(eps[start:] ** 2))


def fit_arma_css(x: np.ndarray, p: int, q: int) -> dict:
    """Fit ARMA(p,q) to x via conditional sum of squares (CSS)."""
    x = np.asarray(x, dtype=float)

    # Initial guess: AR(p) via OLS (ignore MA), theta = 0.
    if p > 0:
        y_reg = x[p:]
        X_lags = np.column_stack([x[p - i - 1 : -(i + 1)] for i in range(p)])
        X = np.column_stack([np.ones(len(y_reg)), X_lags])
        beta, *_ = np.linalg.lstsq(X, y_reg, rcond=None)
        c0 = float(beta[0])
        phi0 = beta[1:]
    else:
        c0 = float(np.mean(x))
        phi0 = np.array([])

    theta0 = np.zeros(q)
    x0 = np.concatenate([[c0], phi0, theta0]).astype(float)

    bounds = [(None, None)] + [(-0.99, 0.99)] * (p + q)
    res = minimize(_arma_css_sse, x0=x0, args=(x, p, q), method="L-BFGS-B", bounds=bounds)

    params = res.x
    c = float(params[0])
    phi = params[1 : 1 + p]
    theta = params[1 + p : 1 + p + q]
    eps = arma_residuals(x, c=c, phi=phi, theta=theta)

    start = max(p, q)
    sigma2 = float(np.mean(eps[start:] ** 2))

    return {
        "success": bool(res.success),
        "message": str(res.message),
        "c": c,
        "phi": phi,
        "theta": theta,
        "eps": eps,
        "sigma": float(np.sqrt(sigma2)),
        "sse": float(np.sum(eps[start:] ** 2)),
        "nobs": int(len(x)),
    }


def arma_forecast_mean(x_hist: np.ndarray, eps_hist: np.ndarray, *, c: float, phi: np.ndarray, theta: np.ndarray, h: int) -> np.ndarray:
    x_hist = np.asarray(x_hist, dtype=float)
    eps_hist = np.asarray(eps_hist, dtype=float)
    phi = np.asarray(phi, dtype=float)
    theta = np.asarray(theta, dtype=float)
    p, q = len(phi), len(theta)

    x = x_hist.tolist()
    eps = eps_hist.tolist()
    out = []

    for _ in range(h):
        ar = sum(phi[i] * x[-1 - i] for i in range(p)) if p else 0.0
        ma = sum(theta[j] * eps[-1 - j] for j in range(q)) if q else 0.0
        pred = c + ar + ma
        x.append(pred)
        eps.append(0.0)  # E[eps_{t+h}] = 0 for mean forecasts
        out.append(pred)

    return np.asarray(out)


def arma_simulate_future(
    x_hist: np.ndarray,
    eps_hist: np.ndarray,
    *,
    c: float,
    phi: np.ndarray,
    theta: np.ndarray,
    sigma: float,
    h: int,
    n_sims: int,
    rng: np.random.Generator,
) -> np.ndarray:
    """Simulate future paths of x_t conditional on history (for uncertainty bands)."""
    x_hist = np.asarray(x_hist, dtype=float)
    eps_hist = np.asarray(eps_hist, dtype=float)
    phi = np.asarray(phi, dtype=float)
    theta = np.asarray(theta, dtype=float)
    p, q = len(phi), len(theta)

    sims = np.zeros((n_sims, h))
    base_len = len(x_hist)

    for s in range(n_sims):
        x = np.concatenate([x_hist, np.zeros(h)])
        eps = np.concatenate([eps_hist, np.zeros(h)])

        for step in range(h):
            t = base_len + step
            ar = 0.0
            for i in range(1, p + 1):
                ar += phi[i - 1] * x[t - i]
            ma = 0.0
            for j in range(1, q + 1):
                ma += theta[j - 1] * eps[t - j]

            eps[t] = rng.normal(0.0, sigma)
            x[t] = c + ar + ma + eps[t]
            sims[s, step] = x[t]

    return sims
# Train/test split
n_train = 260
y_train = y[:n_train]
y_test = y[n_train:]
h = len(y_test)

order = (2, 1, 1)
p, d, q = order

x_train, last_values = difference_levels(y_train, d=d)
fit = fit_arma_css(x_train, p=p, q=q)

print("CSS optimization success:", fit["success"], "|", fit["message"])
print("Estimated c:", fit["c"])
print("Estimated phi:", fit["phi"])
print("Estimated theta:", fit["theta"])
print("Estimated sigma:", fit["sigma"])

# Mean forecast on differenced scale, then integrate back to original scale
x_fc_mean = arma_forecast_mean(x_train, fit["eps"], c=fit["c"], phi=fit["phi"], theta=fit["theta"], h=h)
y_fc_mean = undifference_forecast(x_fc_mean, last_values=last_values)

# Uncertainty via conditional simulation
n_sims = 2000
x_fc_sims = arma_simulate_future(
    x_train,
    fit["eps"],
    c=fit["c"],
    phi=fit["phi"],
    theta=fit["theta"],
    sigma=fit["sigma"],
    h=h,
    n_sims=n_sims,
    rng=rng,
)
y_fc_sims = np.vstack([undifference_forecast(x_fc_sims[i], last_values=last_values) for i in range(n_sims)])

q05 = np.quantile(y_fc_sims, 0.05, axis=0)
q50 = np.quantile(y_fc_sims, 0.50, axis=0)
q95 = np.quantile(y_fc_sims, 0.95, axis=0)
CSS optimization success: True | CONVERGENCE: RELATIVE REDUCTION OF F <= FACTR*EPSMCH
Estimated c: 0.19534310083057616
Estimated phi: [ 0.5284 -0.1357]
Estimated theta: [0.523]
Estimated sigma: 0.9380685254836366
x_train_dates = dates[:n_train]
x_test_dates = dates[n_train:]

fig = go.Figure()
fig.add_trace(go.Scatter(x=x_train_dates, y=y_train, mode="lines", name="Train"))
fig.add_trace(go.Scatter(x=x_test_dates, y=y_test, mode="lines", name="Test (actual)", line=dict(color="#444")))

# Confidence band
fig.add_trace(
    go.Scatter(
        x=x_test_dates,
        y=q95,
        mode="lines",
        name="90% interval",
        line=dict(width=0),
        showlegend=False,
    )
)
fig.add_trace(
    go.Scatter(
        x=x_test_dates,
        y=q05,
        mode="lines",
        line=dict(width=0),
        fill="tonexty",
        fillcolor="rgba(31, 119, 180, 0.20)",
        name="90% interval",
    )
)

fig.add_trace(go.Scatter(x=x_test_dates, y=y_fc_mean, mode="lines", name="Forecast (mean)", line=dict(color="#1f77b4")))

fig.update_layout(
    height=450,
    title=f"ARIMA{order} forecast (NumPy CSS) + simulated uncertainty",
    xaxis_title="Time",
    yaxis_title="Value",
)
fig.show()
# Confidence intuition: show a few sampled future paths + the widening band.
n_paths = 40
path_idx = rng.choice(n_sims, size=n_paths, replace=False)

fig = go.Figure()
fig.add_trace(go.Scatter(x=dates[:n_train], y=y_train, mode="lines", name="Train"))

for i in path_idx:
    fig.add_trace(
        go.Scatter(
            x=x_test_dates,
            y=y_fc_sims[i],
            mode="lines",
            line=dict(color="rgba(31,119,180,0.15)", width=1),
            showlegend=False,
        )
    )

fig.add_trace(go.Scatter(x=x_test_dates, y=q50, mode="lines", name="Median forecast", line=dict(color="#1f77b4", width=3)))
fig.add_trace(
    go.Scatter(x=x_test_dates, y=q95, mode="lines", line=dict(width=0), showlegend=False)
)
fig.add_trace(
    go.Scatter(
        x=x_test_dates,
        y=q05,
        mode="lines",
        line=dict(width=0),
        fill="tonexty",
        fillcolor="rgba(31, 119, 180, 0.20)",
        name="90% band",
    )
)

fig.update_layout(
    height=450,
    title="Forecast uncertainty intuition (fan of simulated futures)",
    xaxis_title="Time",
    yaxis_title="Value",
)
fig.show()

Typical real-world use cases#

ARIMA is a strong baseline when:

  • you have a single time series (univariate forecasting)

  • dynamics are mostly linear and captured by autocorrelation

  • non-stationarity is mostly trend / unit-root-like (handled by differencing)

Common examples:

  • demand / sales forecasting (after removing seasonality or using SARIMA)

  • call volume / ticket volume / website traffic forecasting

  • energy load and simple sensor forecasting (with careful outlier handling)

  • macroeconomic indicators (often with differencing and log transforms)

  • as a baseline in finance for returns/volatility proxies (with strong caveats)

When ARIMA may not be enough:

  • strong seasonality (use SARIMA: seasonal differencing + seasonal AR/MA)

  • external drivers (use ARIMAX / regression with ARMA errors)

  • multiple interacting series (VAR / state space)

  • non-linear patterns / regime shifts (consider richer models)

References#

  • Box, Jenkins, Reinsel, Ljung — Time Series Analysis: Forecasting and Control

  • Hyndman & Athanasopoulos — Forecasting: Principles and Practice